package com.bml.architect.batch;

import com.bml.architect.utils.MqConstants;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;

import java.io.UnsupportedEncodingException;
import java.util.*;

/**
 * 批量消息拆分发送
 */
public class SplitBatchProducer {

    public static void main(String[] args) {
        DefaultMQProducer producer = new DefaultMQProducer("batchProducerGroup");
        try {
            producer.setNamesrvAddr(MqConstants.getNameServerAddr());
            producer.start();
            List<Message> messages = new ArrayList<>(1000 * 1000);
            for (int i = 0; i < 1000 * 1000 ; i++) {
                Message message = new Message("TopicTest", "TagBatch", "batch_"+i, ("rocketmq batch message is " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                messages.add(message);
            }


            ListSplitter splitter = new ListSplitter(messages);
            while (splitter.hasNext()) {
                List<Message> next = splitter.next();
                SendResult send = producer.send(next);
                System.out.println(send);
            }
            producer.shutdown();

        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        }
    }

    static class ListSplitter implements Iterator<List<Message>> {
        private int sizeLimit = 1000 * 1000;
        private final List<Message> messages;
        private int currIndex;

        public ListSplitter(List<Message> messages) {
            this.messages = messages;
        }

        @Override
        public boolean hasNext() {
            return currIndex < messages.size();
        }

        @Override
        public List<Message> next() {
            int nextIndex = currIndex;
            int totalSize = 0;
            for (; nextIndex < messages.size(); nextIndex++) {
                Message message = messages.get(nextIndex);
                int tmpSize = message.getTopic().length() + message.getBody().length;
                Map<String, String> properties = message.getProperties();
                for (Map.Entry<String, String> entry : properties.entrySet()) {
                    tmpSize += entry.getKey().length() + entry.getValue().length();
                }
                tmpSize = tmpSize + 20; //for log overhead
                if (tmpSize > sizeLimit) {
                    //it is unexpected that single message exceeds the sizeLimit
                    //here just let it go, otherwise it will block the splitting process
                    if (nextIndex - currIndex == 0) {
                        //if the next sublist has no element, add this one and then break, otherwise just break
                        nextIndex++;
                    }
                    break;
                }
                if (tmpSize + totalSize > sizeLimit) {
                    break;
                } else {
                    totalSize += tmpSize;
                }

            }
            List<Message> subList = messages.subList(currIndex, nextIndex);
            currIndex = nextIndex;
            return subList;
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("Not allowed to remove");
        }
    }
}
